#!/usr/bin/env python3
#-*- coding:utf-8 -*-
import pymysql
import threading
from taosrest import connect, TaosRestConnection, TaosRestCursor
import time

con_mysql = pymysql.connect(host='127.0.0.1',
                        database='location', 
                        user='root', 
                        password='password',
                        charset='utf8')

con_taosrest: TaosRestConnection = connect(url="http://127.0.0.1:6041",
                        user="root",
                        password="password",
                        timeout=300)

step = 10000
taoscnt = 0
start_time = time.time()
lock = threading.RLock()
"""
DROP TABLE IF EXISTS location.location;
CREATE STABLE location.location ( \
    locationtime TIMESTAMP, \
    lat DOUBLE, \
    lng DOUBLE, \
    speed DOUBLE) \
    TAGS (loginname NCHAR(50), \
      username NCHAR(50), \
      deviceimei NCHAR(50), \
      devicemodel NCHAR(100), \
      devicetype BOOL);
"""

class scheduler():
    def __init__(self):
        self.start = 0
        self.sql_text = 'SELECT Id FROM location.location ORDER BY Id DESC LIMIT 1;'
        cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
        lock.acquire()
        cursor_mysql.execute(self.sql_text)
        self.maxsteplines = cursor_mysql.fetchall()
        lock.release()
        cursor_mysql.close()
        self.maxstep = self.maxsteplines[0]['Id']

    def getfetchid(self):
        global step
        while self.start <= self.maxstep:
            self.start += step
            return (self.start)

def processinsert():
    global step
    global taoscnt
    global start_time
    startid = scheduler.getfetchid()
    endid = startid + step
    while startid:
        endid = startid + step
        cursor_mysql = con_mysql.cursor()
        lock.acquire()
        cursor_mysql.execute('SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname , \
            LoginName, \
            MAX(UserName) AS UserName, \
            DeviceIMEI, \
            IFNULL(MAX(DeviceModel),\'\') AS DeviceModel, \
            IFNULL(MAX(DeviceType),\'\') AS DeviceType\
            FROM location.location \
            WHERE Id > %s \
            AND Id <= %s \
            GROUP BY LoginName,DeviceIMEI;' % (startid,endid))
        tbnamecur = cursor_mysql.fetchall()
        lock.release()
        cursor_mysql.close()

        for tbname,loginname,username,deviceimei,devicemodel,devicetype in tbnamecur:
            cursor_mysql = con_mysql.cursor(cursor=pymysql.cursors.DictCursor)
            lock.acquire()
            cursor_mysql.execute(f'SELECT CONCAT(\'t\',LoginName,\'_\',DeviceIMEI) AS tbname, \
              LocationTime, \
              Lat, \
              Lng, \
              Speed \
            FROM location.location \
            WHERE LoginName = \"%s\" \
              AND DeviceIMEI = \"%s\" \
              AND Id > %d \
              AND Id <= %d;' % (loginname,deviceimei,startid,endid))
            submission = cursor_mysql.fetchall()
            lock.release()
            cursor_mysql.close()
            sql="INSERT INTO location.`%s` USING location.location TAGS (\'%s\',\'%s\',\'%s\',\'%s\',%s) VALUES ".replace("''" ,"Null" ) % (tbname,loginname,username,deviceimei,devicemodel,devicetype)
            if submission:
                for row in submission:
                    sql += '(\'{}\',{},{},{}) '.format(row["LocationTime"],row["Lat"],row["Lng"],row["Speed"])
                sql += ";"
                print(sql)
                cursor_taosrest: TaosRestCursor = con_taosrest.cursor()
                #lock.acquire()
                cursor_taosrest.execute(sql)
                taoscnt += cursor_taosrest.rowcount
                #lock.release()
                cursor_taosrest.close()
                stime = time.time() - start_time
                avgrow = int(taoscnt / stime)
                print("数据已迁移%d行,耗时%d秒,平均行%d/秒" % (taoscnt,stime,avgrow))
        startid = scheduler.getfetchid()

def threads_scheduler(threads_num):
    global start_time
    threads = []
    for i in range(threads_num):
        td = threading.Thread(target=processinsert, name='th'+str(i+1))
        threads.append(td)
    for t in threads:
        t.setDaemon(True)
        t.start()
        #t.join()
        #for t in threads:
            #t.join()
    while threading.active_count() !=1:
        pass
    else:
        end_time = time.time()
        deltatime = end_time - start_time
        totalhour = int(deltatime / 3600)
        totalminute = int((deltatime - totalhour * 3600) / 60)
        totalsecond = int(deltatime - totalhour * 3600 - totalminute * 60)
        print("数据全部迁移完毕,总计耗时:%d小时%d分%d秒!" %(totalhour, totalminute, totalsecond))  
if __name__=='__main__':
    scheduler = scheduler()
    threads_scheduler(8)